{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# The prediction task is to determine whether a person makes over $50K a year."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This data was extracted from the 1994 Census bureau database. (Data Mining and Visualization, Silicon Graphics). "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#!wget https://raw.githubusercontent.com/lemire/RealisticTabularDataSets/master/census-income/census-income_srt.csv.gz # Source"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Description of fnlwgt (final weight)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The weights on the Current Population Survey (CPS) files are controlled to independent estimates of the civilian \n",
    "noninstitutional population of the US. These are prepared monthly for us by Population Division here at the Census Bureau. \n",
    "We use 3 sets of controls. These are:\n",
    "\n",
    "1.A single cell estimate of the population 16+ for each state.\n",
    "\n",
    "2.Controls for Hispanic Origin by age and sex.\n",
    "\n",
    "3.Controls by Race, age and sex.\n",
    "\n",
    "We use all three sets of controls in our weighting program and \"rake\" through them 6 times so that by the end \n",
    "we come back to all the controls we used. The term estimate refers to population totals derived from CPS by \n",
    "creating \"weighted tallies\" of any specified socio-economic characteristics of the population. \n",
    "People with similar demographic characteristics should have similar weights. There is one important caveat to \n",
    "remember about this statement. That is that since the CPS sample is actually a collection of 51 state samples, \n",
    "each with its own probability of selection, the statement only applies within state."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Required packages and environment varibles to import before running the code.\n",
    "\n",
    "import os\n",
    "import time\n",
    "import findspark\n",
    "os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6'\n",
    "os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'\n",
    "#/opt/cloudera/parcels/SPARK2/lib/spark2\n",
    "findspark.init('/opt/cloudera/parcels/SPARK2/lib/spark2')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# path of the dataset = /home/nuveprotech/work/census-income_srt.csv.gz.\n",
    "# copy this dataset to hadoop file system (hdfs).\n",
    "# hadoop fs -copyFromLocal census-income_srt.csv.gz /user/nuveprotech/\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder.appName(\"pyspark-df-overview\").getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.csv(\"hdfs://ip-10-0-1-20.ec2.internal:8020/user/nuveprotech/census-income_srt.csv.gz\", header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- 22: string (nullable = true)\n",
      " |--  Private: string (nullable = true)\n",
      " |--  33: string (nullable = true)\n",
      " |--  19: string (nullable = true)\n",
      " |--  Some college but no degree: string (nullable = true)\n",
      " |--  575: string (nullable = true)\n",
      " |--  College or university: string (nullable = true)\n",
      " |--  Divorced: string (nullable = true)\n",
      " |--  Retail trade: string (nullable = true)\n",
      " |--  Sales: string (nullable = true)\n",
      " |--  Asian or Pacific Islander: string (nullable = true)\n",
      " |--  All other: string (nullable = true)\n",
      " |--  Female: string (nullable = true)\n",
      " |--  No13: string (nullable = true)\n",
      " |--  Not in universe14: string (nullable = true)\n",
      " |--  Children or Armed Forces: string (nullable = true)\n",
      " |--  016: string (nullable = true)\n",
      " |--  017: string (nullable = true)\n",
      " |--  018: string (nullable = true)\n",
      " |--  Head of household: string (nullable = true)\n",
      " |--  Midwest: string (nullable = true)\n",
      " |--  Minnesota: string (nullable = true)\n",
      " |--  Child 18+ ever marr RP of subfamily: string (nullable = true)\n",
      " |--  Child 18 or older: string (nullable = true)\n",
      " |--  1513.26: string (nullable = true)\n",
      " |--  MSA to MSA: string (nullable = true)\n",
      " |--  Different county same state26: string (nullable = true)\n",
      " |--  Different county same state27: string (nullable = true)\n",
      " |--  No28: string (nullable = true)\n",
      " |--  No29: string (nullable = true)\n",
      " |--  230: string (nullable = true)\n",
      " |--  Not in universe31: string (nullable = true)\n",
      " |--  Philippines32: string (nullable = true)\n",
      " |--  Philippines33: string (nullable = true)\n",
      " |--  Philippines34: string (nullable = true)\n",
      " |--  Foreign born- U S citizen by naturalization: string (nullable = true)\n",
      " |--  036: string (nullable = true)\n",
      " |--  Not in universe37: string (nullable = true)\n",
      " |--  238: string (nullable = true)\n",
      " |--  47: string (nullable = true)\n",
      " |--  94: string (nullable = true)\n",
      " |--  - 50000.: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark.sql.types as t\n",
    "census_schema = t.StructType([\n",
    "      t.StructField('age', t.IntegerType(), True)\n",
    "    , t.StructField('workclass', t.StringType(), True)\n",
    "    , t.StructField('fnlwgt', t.IntegerType(), True)\n",
    "    , t.StructField('education', t.StringType(), True)\n",
    "    , t.StructField('education-num', t.IntegerType(), True)\n",
    "    , t.StructField('marital-status', t.StringType(), True)\n",
    "    , t.StructField('occupation', t.StringType(), True)\n",
    "    , t.StructField('relationship', t.StringType(), True)\n",
    "    , t.StructField('race', t.StringType(), True)\n",
    "    , t.StructField('sex', t.StringType(), True)\n",
    "    , t.StructField('capital-gain', t.DoubleType(), True)\n",
    "    , t.StructField('capital-loss', t.DoubleType(), True)\n",
    "    , t.StructField('hours-per-week', t.IntegerType(), True)\n",
    "    , t.StructField('native-country', t.StringType(), True)\n",
    "    , t.StructField('label', t.StringType(), True)\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.csv(\"hdfs://ip-10-0-1-20.ec2.internal:8020/user/nuveprotech/census-income_srt.csv.gz\", header=True, schema=census_schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- age: integer (nullable = true)\n",
      " |-- workclass: string (nullable = true)\n",
      " |-- fnlwgt: integer (nullable = true)\n",
      " |-- education: string (nullable = true)\n",
      " |-- education-num: integer (nullable = true)\n",
      " |-- marital-status: string (nullable = true)\n",
      " |-- occupation: string (nullable = true)\n",
      " |-- relationship: string (nullable = true)\n",
      " |-- race: string (nullable = true)\n",
      " |-- sex: string (nullable = true)\n",
      " |-- capital-gain: double (nullable = true)\n",
      " |-- capital-loss: double (nullable = true)\n",
      " |-- hours-per-week: integer (nullable = true)\n",
      " |-- native-country: string (nullable = true)\n",
      " |-- label: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "199522"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = df.drop('fnlwgt')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- age: integer (nullable = true)\n",
      " |-- workclass: string (nullable = true)\n",
      " |-- education: string (nullable = true)\n",
      " |-- education-num: integer (nullable = true)\n",
      " |-- marital-status: string (nullable = true)\n",
      " |-- occupation: string (nullable = true)\n",
      " |-- relationship: string (nullable = true)\n",
      " |-- race: string (nullable = true)\n",
      " |-- sex: string (nullable = true)\n",
      " |-- capital-gain: double (nullable = true)\n",
      " |-- capital-loss: double (nullable = true)\n",
      " |-- hours-per-week: integer (nullable = true)\n",
      " |-- native-country: string (nullable = true)\n",
      " |-- label: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import count, avg, desc"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+------+------------------+\n",
      "|education|   qty|           avg_age|\n",
      "+---------+------+------------------+\n",
      "|        0|100684|  30.4666481268126|\n",
      "|        2|  8756| 42.35929648241206|\n",
      "|       26|  7887| 36.99987320907823|\n",
      "|       19|  5412| 33.11326681448632|\n",
      "|       29|  5105| 31.13379040156709|\n",
      "|       36|  4145| 38.13148371531966|\n",
      "|       34|  4025| 37.75453416149068|\n",
      "|       10|  3683| 40.27694814010318|\n",
      "|       16|  3445| 40.43570391872279|\n",
      "|       23|  3392| 39.56367924528302|\n",
      "|       12|  3340| 40.41017964071856|\n",
      "|       33|  3325| 39.17834586466165|\n",
      "|        3|  3195|40.583098591549295|\n",
      "|       35|  3168|40.339962121212125|\n",
      "|       38|  3003| 40.14085914085914|\n",
      "|       31|  2699|  39.9284920340867|\n",
      "|       32|  2398| 36.92577147623019|\n",
      "|       37|  2234|37.978961504028646|\n",
      "|        8|  2151|40.201301720130175|\n",
      "|       42|  1918| 34.06308654848801|\n",
      "+---------+------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy(['education']). \\\n",
    "agg(\n",
    "    count('*').alias('qty'), \n",
    "    avg('age').alias('avg_age')\n",
    ").orderBy(desc('qty')). \\\n",
    "show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.createOrReplaceTempView(\"census\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+----+------------------+\n",
      "|education| qty|           avg_age|\n",
      "+---------+----+------------------+\n",
      "|       26|7887| 36.99987320907823|\n",
      "|       28|1661| 39.00842865743528|\n",
      "|        6| 441| 40.01133786848072|\n",
      "|       23|3392| 39.56367924528302|\n",
      "|       12|3340| 40.41017964071856|\n",
      "|       11| 637| 42.24960753532182|\n",
      "|       16|3445| 40.43570391872279|\n",
      "|        3|3195|40.583098591549295|\n",
      "|        4|1364|40.894428152492665|\n",
      "|        7| 731| 43.83036935704514|\n",
      "|       14| 932|37.832618025751074|\n",
      "|       44|1592| 34.88693467336683|\n",
      "|        9| 738|  42.0609756097561|\n",
      "|       43|1382|48.490593342981185|\n",
      "|       36|4145| 38.13148371531966|\n",
      "|       29|5105| 31.13379040156709|\n",
      "|       46|  36|31.305555555555557|\n",
      "|       15| 815| 36.32515337423313|\n",
      "|       22| 411|36.695863746958636|\n",
      "|       32|2398| 36.92577147623019|\n",
      "+---------+----+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "s = spark.sql(\"\"\"\n",
    "SELECT \n",
    "    education, \n",
    "    COUNT(*) AS qty, \n",
    "    AVG(age) AS avg_age\n",
    "FROM census\n",
    "GROUP BY education\n",
    "\"\"\")\n",
    "s.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "def my_query(field):\n",
    "    return df.groupBy([field]). \\\n",
    "    agg(\n",
    "        count('*').alias('qty'), \n",
    "        avg('age').alias('avg_age')\n",
    "    ).orderBy(desc('qty'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------+------------------+\n",
      "|           workclass|   qty|           avg_age|\n",
      "+--------------------+------+------------------+\n",
      "|     Not in universe|100245|30.512973215621727|\n",
      "|             Private| 72027| 36.93916170325017|\n",
      "| Self-employed-no...|  8445| 44.92161042036708|\n",
      "|    Local government|  7784| 41.90506166495375|\n",
      "|    State government|  4227|39.911757747811684|\n",
      "| Self-employed-in...|  3265|45.864931087289435|\n",
      "|  Federal government|  2925|42.342564102564104|\n",
      "|        Never worked|   439|19.888382687927106|\n",
      "|         Without pay|   165| 38.68484848484849|\n",
      "+--------------------+------+------------------+\n",
      "\n",
      "None\n"
     ]
    }
   ],
   "source": [
    "print(my_query('workclass').show())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----+-------------+------------+------------+--------------+\n",
      "|summary| age|education-num|capital-gain|capital-loss|hours-per-week|\n",
      "+-------+----+-------------+------------+------------+--------------+\n",
      "|  count|   0|            0|           0|           0|             0|\n",
      "|   mean|null|         null|        null|        null|          null|\n",
      "| stddev|null|         null|        null|        null|          null|\n",
      "|    min|null|         null|        null|        null|          null|\n",
      "|    max|null|         null|        null|        null|          null|\n",
      "+-------+----+-------------+------------+------------+--------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select('age', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week').describe().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-------------------+------------------+------------------+\n",
      "|summary|          workclass|         education|    marital-status|\n",
      "+-------+-------------------+------------------+------------------+\n",
      "|  count|             199522|            199522|            199522|\n",
      "|   mean|               null|11.306517577009052| 55.42430408676737|\n",
      "| stddev|               null| 14.45422987661719|274.89468181014723|\n",
      "|    min| Federal government|                 0|                 0|\n",
      "|    max|        Without pay|                 9|              9999|\n",
      "+-------+-------------------+------------------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select('workclass', 'education', 'marital-status').describe().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n",
      "|marital-status_freqItems                                                                                                                                                                                                                                                                                                                                                                                                                             |\n",
      "+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n",
      "|[ 1250,  2000,  1655,  500,  800,  2290,  3700,  2300,  1757,  1100,  850,  0,  475,  985,  1400,  1706,  550,  1700,  3205,  1684,  2050,  3156,  625,  2500,  1750,  1864,  2373,  1150,  400,  1258,  2200,  1600,  700,  465,  3000,  5000,  1000,  1900,  750,  1300,  825,  435,  450,  2100,  600,  900,  1650,  1758,  525,  2400,  575,  2700,  1950,  950,  1050,  1500,  2045,  425,  650,  1800,  1200,  725,  2600,  1850,  2012,  4807]|\n",
      "+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.freqItems(['marital-status']).show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import isnan, when, count, col"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-------------+----------+--------------+--------------+\n",
      "|workclass|education-num|occupation|hours-per-week|native-country|\n",
      "+---------+-------------+----------+--------------+--------------+\n",
      "|   199522|       199522|    199522|        199522|        199522|\n",
      "+---------+-------------+----------+--------------+--------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# All columns\n",
    "# cols = df.columns\n",
    "# Selected columns\n",
    "cols = ['workclass', 'education-num', 'occupation', 'hours-per-week', 'native-country']\n",
    "\n",
    "# https://stackoverflow.com/a/44631639/570393\n",
    "df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cols]).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total rows: 199522\n",
      "only complete rows: 0\n"
     ]
    }
   ],
   "source": [
    "# Total rows\n",
    "print('total rows: %s' % df.count())\n",
    "\n",
    "# After droping NA records\n",
    "print('only complete rows: %s' % df.dropna().count())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "def show_df(df, field='occupation'):\n",
    "    df.groupBy(field).count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------+\n",
      "|          occupation| count|\n",
      "+--------------------+------+\n",
      "| College or unive...|  5687|\n",
      "|         High school|  6892|\n",
      "|     Not in universe|186943|\n",
      "+--------------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "show_df(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------+\n",
      "|          occupation| count|\n",
      "+--------------------+------+\n",
      "| College or unive...|  5687|\n",
      "|         High school|  6892|\n",
      "|     Not in universe|186943|\n",
      "+--------------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Fill with a fixed value\n",
    "new_df = df.fillna({'occupation': 'Other-service'})\n",
    "\n",
    "# Count \n",
    "show_df(new_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This is distributed\n",
    "df_spark = df.groupBy('workclass').agg(count('*').alias('counts')).orderBy('counts')\n",
    "# df_spark.show()\n",
    "\n",
    "# This is running on driver\n",
    "df_wk = df_spark.toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_wk.to_csv('output.csv')"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
